package org.adam2.sariel.dao.impl;

import org.adam2.sariel.constant.TableName;
import org.adam2.sariel.dao.BoardDao;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.data.hadoop.hbase.ResultsExtractor;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Repository
public class BoardDaoImpl extends AbstractDao implements BoardDao {

    /**
     * 插入Board对象
     *
     * @param rowName
     * @param familyName
     * @param qualifier
     * @param value
     */
    @Override
    public void put(String rowName, String familyName, String qualifier, byte[] value) {
        super.putBoard(rowName, familyName, qualifier, value);
    }

    /**
     * 批量加载Board对象
     *
     * @param mutationList
     */
    @Override
    public void batchPut(List<Mutation> mutationList) {
        super.batchPut(mutationList, TableName.BOARD);
    }

    /**
     * 根据rowName, familyName, qualifier返回board表中的一个value值
     *
     * @param rowName
     * @param familyName
     * @param qualifier
     * @return
     */
    public String getValueByRowNameAndFamilyAndQualifier(String rowName, String familyName, String qualifier) {
        return super.getValueByRowNameAndFamilyAndQualifier(rowName, familyName, qualifier);
    }

    /**
     * 根据qualifier的value，返回rowName(行键)
     *
     * @param qualifier
     * @param value
     * @return
     */
    @Override
    public String getRowNameByQualifier(String qualifier, final String value) {
        Filter filter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(qualifier)));
        Scan scan = new Scan();
        scan.setFilter(filter);
        return hbaseTemplate.find(TableName.BOARD, scan, new ResultsExtractor<String>() {
            @Override
            public String extractData(ResultScanner results) throws Exception {
                for (Result res : results) {
                    for (Cell cell : res.rawCells()) {
                        if (value.equals(Bytes.toString(CellUtil.cloneValue(cell)))) {
                            return Bytes.toString(cell.getRow());
                        }
                    }
                }
                return null;
            }
        });
    }

    /**
     * 通过开始行键和结束行键获取board表中的数据
     *
     * @param startRow
     * @param stopRow
     * @return
     */
    @Override
    public List<Map<String, Object>> find(String startRow, String stopRow) {
        Scan scan = new Scan();
        if (startRow == null) {
            startRow = "";
        }
        if (stopRow == null) {
            stopRow = "";
        }
        scan.setStartRow(Bytes.toBytes(startRow));
        scan.setStopRow(Bytes.toBytes(stopRow));

        return hbaseTemplate.find(TableName.BOARD, scan, new RowMapper<Map<String, Object>>() {
            public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {

                List<Cell> ceList = result.listCells();
                Map<String, Object> map = new HashMap<String, Object>();
                String row = "";
                if (ceList != null && ceList.size() > 0) {
                    for (Cell cell : ceList) {
                        row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                        String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                        String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                        String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                        map.put(family + ":" + qualifier, value);
                    }
                    map.put("row", row);
                }
                return map;
            }
        });
    }

    public Map scan() {
        return hbaseTemplate.get(TableName.BOARD, "f8a54192657b4c47ab9710daa28a4a2f", new RowMapper<Map<String, Object>>() {
            public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {

                List<Cell> ceList = result.listCells();
                Map<String, Object> map = new HashMap<String, Object>();
                if (ceList != null && ceList.size() > 0) {
                    for (Cell cell : ceList) {
                        System.out.println(Bytes.toString(cell.getFamilyArray()));
                        System.out.println(Bytes.toString(cell.getQualifierArray()));
                        System.out.println(Bytes.toString(cell.getValueArray()));
                        System.out.println(Bytes.toString(cell.getValue()));

                        map.put(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +
                                        "_" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                                Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                }
                return map;
            }
        });
    }
}
